package com.atguigu.gmall.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Author: Felix
 * Date: 2021/11/26
 * Desc: 使用FlinkCDC读取MySQL数据库表变化--SQL方式
 */
public class FlinkCDC02_SQL {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 设置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(1);
        //1.3 设置表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //TODO 2.读取数据  创建动态表
        tableEnv.executeSql("CREATE TABLE user_info (" +
            " id INT," +
            " name STRING," +
            " age INT" +
            ") WITH (" +
            " 'connector' = 'mysql-cdc'," +
            " 'hostname' = 'hadoop202'," +
            " 'port' = '3306'," +
            " 'username' = 'root'," +
            " 'password' = '123456'," +
            " 'database-name' = 'gmall0609_realtime'," +
            " 'table-name' = 't_user'" +
            ")");

        tableEnv.executeSql("select * from user_info").print();

        env.execute();
    }
}
